Skip to content

Conversation

@stan-r-ds
Copy link
Contributor

@stan-r-ds stan-r-ds commented Dec 29, 2025

📋 Type of the Changes

  • Breaking change
  • Non-breaking change
  • Bug fix / minor change

🛠 Changes being made

Added streamx commands for StreamX 2.0 (and removed the original run/stream/batch):

  • run_v2
  • stream_v2
  • batch_v2

Removed streamx commands:

  • publish and unpublish: permanently, as discussed
  • cloud, dev and init: removed for now, since adjusting them to StreamX 2.0 was not part of the task, but need the whole codebase to compile and pass tests

✅ Checklist

  • My code follows the code standards of this project
  • Changed code is covered with unit tests
  • I have updated READMEs and java docs (if applicable)

…and remove streamx publish/unpublish/cloud/dev/init commands for now)
@stan-r-ds stan-r-ds requested a review from a team as a code owner December 29, 2025 13:34
uses: actions/setup-java@v3
with:
java-version: '17'
java-version: '21'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The builds fail on java 17 when importing com.streamx ingestion client that's built with java 21

@Override
public Map<String, String> createSubstitutionVariables(String payloadPath,
String channel, String relativePath) {
String eventType, String eventSource, String relativePath) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the eventsource.yaml files:

  • channel is now replaced with eventType
  • and eventSource is added to them, as a mandatory field in CloudEvents API

private final Map<String, Publisher<JsonNode>> publishersCache = new HashMap<>();
private final Map<String, String> schemaTypesCache = new HashMap<>();
// TODO "_v2" is a temporary postfix for now
public static final String COMMAND_NAME = "batch_v2";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added those postfixes instead of adding an additional experimental word in the command, to not break the current limitations that each command has a single word name

@Inject
IngestionMessageJsonFactory ingestionMessageJsonFactory;

private State state;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now a method parameter instead of a mutable field

if (publisher == null) {
publisher = client.newPublisher(getChannel(), JsonNode.class);
publishersCache.put(getChannel(), publisher);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now a single publisher

@ConfigProperty(name = "streamx.cli.e2e.web.delivery.url", defaultValue = "http://localhost:8087/")
String webDeliveryPortUrl;
@ConfigProperty(name = "streamx.cli.e2e.nginx.url", defaultValue = "http://localhost:8089/overridden/")
String nginxPortUrl;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make things simplier maybe we don't need to test both web sink and nginx in CLI - this is rather the responsibility of streamx-service-mesh/runner (and its tested there)

CLI_SHORT_TIMEOUT_IN_SEC)
);
String url = webDeliveryPortUrl + resourcePath;
httpValidator.validate(url, expectedStatusCode, expectedBody, CLI_TIMEOUT);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing just in the Web Sink is enough 💭

@@ -0,0 +1 @@
<h1>Hello World!</h1>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...could have empty content, since it's for unpublishing - but still the file must exist to be picked up

pom.xml Outdated
<streamx.version>1.0.16</streamx.version>
<ingestion-client.version>1.1.1</ingestion-client.version>
<streamx-operator.version>0.0.16</streamx-operator.version>
<streamx.version>2.0.6</streamx.version>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge client and runner versions to single release of streamx-service-mesh

pom.xml Outdated
<executions>
<execution>
<goals>
<goal>integration-test</goal>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enable running the e2e tests

import dev.streamx.cli.command.ingestion.batch.EventSourceDescriptor;
import dev.streamx.cli.command.ingestion.batch.exception.EventSourceDescriptorException;
import dev.streamx.cli.util.FileUtils;
import io.quarkus.logging.Log;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class makes jacoco coverage measurer lost - makes the whole class have 0% coverage, which may make it look unused

This also fixes the
[WARNING] Rule violated for bundle streamx-cli-core: instructions covered ratio is 0.65, but expected minimum is 0.70


public class EventSourceFileTreeWalker extends SimpleFileVisitor<Path> {

private final Logger logger = Logger.getLogger(EventSourceFileTreeWalker.class);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other classes in this project also do this

@stan-r-ds stan-r-ds force-pushed the DXP-2486-add-run-stream-and-batch-commands-for-streamx-2.0 branch from 2942184 to 16501c8 Compare December 30, 2025 09:26

@Inject
MeshDefinitionResolver uut;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test for interpolating with default value, such as:

ref: "${config.source.interpolated:inbox.pages}"

It works as expected 👍

System.setProperty("config.image.interpolated", "value");
void shouldResolveWithMandatoryAndOptionalPropertiesDefined() throws IOException {
System.setProperty("config.image.interpolated", "image-1");
System.setProperty("config.source.interpolated", "source-1");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default value is overridden here

- name: Build project
run: |
./mvnw clean verify -P all-tests
./mvnw clean verify -P all-tests -Dnative
Copy link
Contributor Author

@stan-r-ds stan-r-ds Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leverage the multi-arch feature to allow running mesh (in integration tests) also on both linux and mac host systems


private static final Set<String> COMMANDS_REQUIRING_PRINTING_BANNER =
Set.of(DevCommand.COMMAND_NAME, RunCommand.COMMAND_NAME);
Set.of(RunCommand.COMMAND_NAME);
Copy link
Contributor Author

@stan-r-ds stan-r-ds Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The not migrated (so - removed) commands are

  • dev
  • cloud (deploy/undeploy)
  • init
  • publish
  • unpublish

doRun(client);
} catch (UnsupportedChannelException e) {
throw new ParameterException(spec.commandLine(), e.getMessage());
} catch (StreamxClientConnectionException e) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those exception types are gone

} catch (StreamxClientConnectionException e) {
throw new UnableToConnectIngestionServiceException(ingestionClientConfig.url(), e);
Publisher publisher = client.newPublisher();
perform(publisher);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlined the doRun method here

state.eventSubject,
state.eventType,
state.eventSource,
state.payload // TODO rename payload to data everywhere
Copy link
Contributor Author

@stan-r-ds stan-r-ds Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think worth doing this in a separate PR, since a lot of files would get changed, making the PR too hard to read. Internally "payload" is assigned as cloud event's "data" ✅

.withDataContentType("application/json")
.withData(JsonCloudEventData.wrap(data))
.build();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed only by the batch command to create the event from event descriptor and data payload files

"subject" : "homepage.html",
"time" : "2025-12-23T10:28:23.435253Z",
"data" : {
"content" : "<h1>This is Home Page!</h1>",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works because next to this file we have .stream.properties file with content:
json.paths.to.encode.to.base64=/data/content

events: {}
outgoing:
relayed-events: {}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mesh.yaml is used only by RunCommandTest, so it must be valid and "runnable"

defaultImageTag: ${streamx.version}-jvm
descriptors:
relay:
type: "processing"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used only in interpolation unit test

cli:
outgoing:
- "pages"
- ref: "${config.source.interpolated:inbox.pages}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test for property with default value.
Turned out it is applied

<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>compile</scope>
</dependency>
Copy link
Contributor Author

@stan-r-ds stan-r-ds Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module is a bit weird:

  • it has tests in src/test
  • but it has test utils in src/main 🤷

That's why we can use assertj. It gives way better error messages while troubleshooting "java -jar runner.jar" commands

<directory>src/test/resources/filtered</directory>
<includes>
<include>mesh.yaml</include>
<include>overridden-nginx-conf</include>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe no need to test this feature in the CLI, it's tested at source projects

String responseBody = EntityUtils.toString(httpEntity);
assertThat(responseBody).describedAs(url).contains(expectedBody);
});
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an overload to test also binary expected body (to verify an image will be published to StreamX with unchanged content)

String responseBody = EntityUtils.toString(response.getEntity());
logger.info("Request to " + url
+ " return statusCode " + actualStatusCode
+ " and body " + responseBody);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to assertj .describedAs() as well as its descriptive error messages - we don't need to log manually anymore

.describedAs(() -> "Full output is:\n"
+ String.join("\n", process.getCurrentOutputLines())
+ "\n"
+ String.join("\n", process.getCurrentErrorLines()))
Copy link
Contributor Author

@stan-r-ds stan-r-ds Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding very helpful logs.

Until now, the tests tested that "java -jar"'s process success output logs contain expected text.

Now - when they don't - the full process output (including errors) is logged

}

private void runStreamxCommand(String command, String expectedOutput, long timeoutInS) {
private void runStreamxCommand(String command, String expectedOutput, Duration timeout) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to specify the timeout unit in parameter name, if we can use Duration object

}

@ParameterizedTest
@MethodSource("testCases")
Copy link
Contributor Author

@stan-r-ds stan-r-ds Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those were test cases for publish command, and it was decided that

  • we remove publish and unpublish
  • because no one uses them
  • and they handle just a single ingestion while the jar start time is long
  • we don't want to manage the wide variety of options described in this method

Instead, the test class was modified to verify end-to-end:

  • batch publish and unpublish
  • stream publish and unpublish

);

validateStreamxPage("ds.png",
Files.readAllBytes(Path.of("src/test/resources/batch/publish/asset/ds.png")));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The image is published to real StreamX and here we verify that when the image is downloaded via http from web sink service url - the bytes are unchanged

.map(File::toPath)
.map(Path::toAbsolutePath)
.map(Path::normalize)
.map(Path::toString)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove any "/../" from the logged path

processes.add(shellProcess);
return shellProcess;
} catch (IOException e) {
logger.info("Error running terminal command: " + command);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deduplicate log messages for success

@stan-r-ds stan-r-ds force-pushed the DXP-2486-add-run-stream-and-batch-commands-for-streamx-2.0 branch from 49665df to 0158222 Compare January 5, 2026 10:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

2 participants